Overview
基于Java1.6, 前后的版本实现都有所不同.
确定入口
在通常情况下, 我们是配合线程池获取一个Future的:1
2
3
4Callable<Integer> callable = () -> {
return 1;
};
Future<Integer> future = Executors.newSingleThreadExecutor().submit(callable);
观察submit方法内部, 不考虑”池”的概念实际上等同于下面操作:
1 | RunnableFuture<Integer> runnableFuture = new FutureTask<>(callable); |
RunnableFuture本身实现了Runnable和Future. 从而可以确定FutureTask的两个重要入口方法: run, get.
Sync
观察FutureTask内部, 同样有一个Sync. 和CountDownLatch相似, FutureTask本身也是个共享的同步工具. 由于一个线程被unpark后, 会继续传播PROPAGATE, 所以当FutureTask中的任务执行完成后, 所有调用get方法的线程都会退出阻塞. CountDownLatch中state表示数量, 而在FutureTask则表示生命周期的状态:
- READY = 0 刚刚
new FutureTask后的状态. - RUNNING = 1
- RAN = 2 表示执行完毕
- CANCELED = 4
如果强行类比的话, 那么get就对应CountDownLatch的await. run/cancel则对应countDown.
get
调用关系如下:
FutureTask#get -> Sync#innerGet -> AQS#acquireSharedInterruptibly(0)
根据我们之前的了解, 如果没有tryAcquireShared(0)成功, 则当前线程会被挂起. 在此处, tryAcquireShared的参数是没有任何意义的, 所以形参被取名叫ignore.
1 | protected int tryAcquireShared(int ignore) { |
innerIsDone大致为判断了状态是否为RAN或者CANCELED, 也就是说, 当其他线程将state设置为这两种状态之后, 等待线程才能结束运行. 当等待线程被唤醒后, 会做两个判断, 代码非常简单:
1 | if (getState() == CANCELLED) |
run
FutureTask#run 中仅为调用 Sync#innerRun. 其中执行了callable#call, 没有异常则set, 反之则setException. 分别对应Sync#innerSet, Sync#innerSetException.
观察两个方法, 其中的重要内容就是把stateCAS为RAN, 并doReleaseShared(0). 在CountDownLatch中有提到过这个方法, 子类的重点在于tryReleaseShared是如何实现的, 一定是返回true的, 且内容非常简单粗暴:
1 | protected boolean tryReleaseShared(int ignore) { |
可以看到, 这个方法的参数同样是无意义的. 其中的runner指的是执行FutureTask的线程, 即调用set,setException的线程. 实际上刚刚的innerIsDone除了判断状态之外, 还要求runner == null, 对应地, 他在tryReleaseShared这里被置为null.
cancel
大体和set, setException一致, 都是将stateCAS为一个值, 此处为CANCELED. 并且执行doReleaseShared. 除次之外, 如果为cancel(true)还会根据参数对执行任务的线程进行interrupt. 但是我们知道interrupt不一定成功, 观察下面的栗子:
1 | public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { |
可以发现, cancel之后, get线程马上就得到响应, 抛出异常了. 而执行线程仍然要在等待几秒后执行完.